LuCloud 9 HDFS

LuCloud 9 HDFS

六月 19, 2019

hdfs

概述

源自谷歌提出的文件系统,详见The Google File System中文版。由于实质上HDFS是GFS的一种开源实现,因此这里混谈GFS和HDFS,并在最后介绍二者差异。

Google GFS文件系统,一个面向大规模数据密集型应用的、可伸缩的分布式文件系统。

GFS提供了一套类似传统文件系统的API接口函数,虽然并不是严格按照POSIX等标准API的形式实现的。文件以分层目录的形式组织,用路径名来标识。我们支持常用的操作,如创建新文件、删除文件、打开文件、关闭文件、读和写文件。

分布式文件系统的要求

首先,组件失效被认为是常态事件,而不是意外事件。GFS包括几百甚至几千台普通的廉价设备组装的存储机器,同时被相当数量的客户机访问。 GFS组件的数量和质量导致在事实上,任何给定时间内都有可能发生某些组件无法工作,某些组件无法从它们目前的失效状态中恢复。我们遇到过各种各样的问 题,比如应用程序bug、操作系统的bug、人为失误,甚至还有硬盘、内存、连接器、网络以及电源失效等造成的问题。所以,持续的监控、错误侦测、灾难冗余以及自动恢复的机制必须集成在GFS中

其次,以通常的标准衡量,我们的文件非常巨大。数GB的文件非常普遍。每个文件通常都包含许多应用程序对象,比如web文档。当我们经常需要处 理快速增长的、并且由数亿个对象构成的、数以TB的数据集时,采用管理数亿个KB大小的小文件的方式是非常不明智的,尽管有些文件系统支持这样的管理方式。因此,设计的假设条件和参数,比如I/O操作和Block的尺寸都需要重新考虑。

第三,绝大部分文件的修改是采用在文件尾部追加数据,而不是覆盖原有数据的方式。对文件的随机写入操作在实际中几乎不存在。一旦写完之后,对文 件的操作就只有读,而且通常是按顺序读。大量的数据符合这些特性,比如:数据分析程序扫描的超大的数据集;正在运行的应用程序生成的连续的数据流;存档的 数据;由一台机器生成、另外一台机器处理的中间数据,这些中间数据的处理可能是同时进行的、也可能是后续才处理的。对于这种针对海量文件的访问模式,客户 端对数据块缓存是没有意义的,数据的追加操作是性能优化和原子性保证的主要考量因素。

第四,应用程序和文件系统API的协同设计提高了整个系统的灵活性。比如,我们放松了对GFS一致性模型的要求,这样就减轻了文件系统对应用程 序的苛刻要求,大大简化了GFS的设计。我们引入了原子性的记录追加操作,从而保证多个客户端能够同时进行追加操作,不需要额外的同步操作来保证数据的一 致性。本文后面还有对这些问题的细节的详细讨论。

高性能的稳定网络带宽远比低延迟重要。我们的目标程序绝大部分要求能够高速率的、大批量的处理数据,极少有程序对单一的读写操作有严格的响应时间要求。

架构

一个GFS集群包含一个单独的Master节点、多台 Chunk服务器,并且同时被多个客户端访问,如图1所示。所有的这些机器通常都是普通的Linux机器,运行着用户级别(user-level)的服务进程。我们可以很容易的把Chunk服务器和客户端都放在同一台机器上,前提是机器资源允许,并且我们能够接受不可靠的应用程序代码带来的稳定性降低的风 险。

谷歌三大核心技术(二)Google MapReduce中文版

GFS存储的文件都被分割成固定大小的Chunk。在Chunk创建的时候,Master服务器会给每个Chunk分配一个不变的、全球唯一的 64位的Chunk标识。Chunk服务器把Chunk以linux文件的形式保存在本地硬盘上,并且根据指定的Chunk标识和字节范围来读写块数据。 出于可靠性的考虑,每个块都会复制到多个块服务器上。缺省情况下,我们使用3个存储复制节点,不过用户可以为不同的文件命名空间设定不同的复制级别。

Master节点管理所有的文件系统元数据。这些元数据包括名字空间、访问控制信息、文件和Chunk的映射信息、以及当前Chunk的位置信息。Master节点还管理着系统范围内的活动,比如,Chunk租用管理 、孤儿Chunk的回收、以及Chunk在Chunk服务器之间的迁移。Master节点使用心跳信息周期地和每个Chunk服务器通讯,发送指令到各个Chunk服务器并接收Chunk服务器的状态信息。

Master服务器 *(alex注:注意逻辑的Master节点和物理的Master服务器的区别。后续我们谈的是每个Master服务器的行为,如存储、内存等等,因此我们将全部使用物理名称)*存 储3种主要类型的元数据,包括:文件和Chunk的命名空间、文件和Chunk的对应关系、每个Chunk副本的存放地点。所有的元数据都保存在 Master服务器的内存中。前两种类型的元数据(命名空间、文件和Chunk的对应关系)同时也会以记录变更日志的方式记录在操作系统的系统日志文件 中,日志文件存储在本地磁盘上,同时日志会被复制到其它的远程Master服务器上。采用保存变更日志的方式,我们能够简单可靠的更新Master服务器 的状态,并且不用担心Master服务器崩溃导致数据不一致的风险。Master服务器不会持久保存Chunk位置信息。Master服务器在启动时,或者有新的Chunk服务器加入时,向各个Chunk服务器轮询它们所存储的Chunk的信息。

GFS客户端代码以库的形式被链接到客户程序里。客户端代码实现了GFS文件系统的API接口函数、应用程序与Master节点和Chunk服务器通讯、以及对数据进行读写操作。客户端和Master节点的通信只获取元数据,所有的数据操作都是由客户端直接和Chunk服务器进行交互的。我们不提供POSIX标准的API的功能,因此,GFS API调用不需要深入到Linux vnode级别。

无论是客户端还是Chunk服务器都不需要缓存文件数据。客户端缓存数据几乎没有什么用处,因为大部分程序要么以流的方式读取一个巨大文件,要 么工作集太大根本无法被缓存。无需考虑缓存相关的问题也简化了客户端和整个系统的设计和实现。(不过,客户端会缓存元数据。)Chunk服务器不需要缓存 文件数据的原因是,Chunk以本地文件的方式保存,Linux操作系统的文件系统缓存会把经常访问的数据缓存在内存中。

差异

特性

HDFS High-Throughput Access to Large Data Sets (Files)

实际上是分布式文件系统的要求第五点的体现:

Because HDFS is primarily designed for batch processing rather than interactive processing, data access throughput in HDFS is more important than latency. Also, because applications run on HDFS typically have large data sets, individual files are broken into large blocks (e.g., 64 MB) to allow HDFS to decrease the amount of metadata storage required per file. This provides two advantages: The list of blocks per file will shrink as the size of individual blocks increases, and by keeping large amounts of data sequentially within a block, HDFS provides fast streaming reads of data.

容错

DN容错

  • Block replication To reliably store data in HDFS, file blocks are replicated in this system. In other words, HDFS stores a file as a set of blocks and each block is replicated and distributed across the whole cluster. The replication factor is set by the user and is three by default.
  • Replica placement The placement of replicas is another factor to fulfill the desired fault tolerance in HDFS. Although storing replicas on different nodes (DataNodes) located in different racks across the whole cluster provides more reliability, it is sometimes ignored as the cost of communication between two nodes in different racks is relatively high in comparison with that of different nodes located in the same rack. Therefore, sometimes HDFS compromises its reliability to achieve lower communication costs. For example, for the default replication factor of three, HDFS stores one replica in the same node the original data is stored, one replica on a different node but in the same rack, and one replica on a different node in a different rack to provide three copies of the data [65].
  • Heartbeat and Blockreport messages Heartbeats and Blockreports are periodic messages sent to the NameNode by each DataNode in a cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly, while each Blockreport contains a list of all blocks on a DataNode [65]. The NameNode receives such messages because it is the sole decision maker of all replicas in the system.

NN容错

GFS中

所有的元数据都保存在 Master服务器的内存中。前两种类型的元数据(命名空间、文件和Chunk的对应关系)同时也会以记录变更日志的方式记录在操作系统的系统日志文件 中,日志文件存储在本地磁盘上,同时日志会被复制到其它的远程Master服务器上。采用保存变更日志的方式,我们能够简单可靠的更新Master服务器的状态,并且不用担心Master服务器崩溃导致数据不一致的风险。

操作日志非常重要,我们必须确保日志文件的完整,确保只有在元数据的变化被持久化后,日志才对客户端是可见的。否则,即使Chunk本身没有出现任 何问题,我们仍有可能丢失整个文件系统,或者丢失客户端最近的操作。所以,我们会把日志复制到多台远程机器,并且只有把相应的日志记录写入到本地以及远程 机器的硬盘后,才会响应客户端的操作请求。Master服务器会收集多个日志记录后批量处理,以减少写入磁盘和复制对系统整体性能的影响。

Master服务器在灾难恢复时,通过重演操作日志把文件系统恢复到最近的状态。为了缩短Master启动的时间,我们必须使日志足够小 *(alex注:即重演系统操作的日志量尽量的少)。*Master服务器在日志增长到一定量时对系统状态做一次Checkpoint *(alex注:Checkpoint是一种行为,一种对数据库状态作一次快照的行为),*将所有的状态数据写入一个Checkpoint文件 (alex注:并删除之前的日志文件)。 在灾难恢复的时候,Master服务器就通过从磁盘上读取这个Checkpoint文件,以及重演Checkpoint之后的有限个日志文件就能够恢复系统。Checkpoint文件以压缩B-树形势的数据结构存储,可以直接映射到内存,在用于命名空间查询时无需额外的解析。这大大提高了恢复速度,增强了可用性。

来源:The Google File System中文版

HDFS中

第一部分:目前Hadoop1.0架构的问题

img
单点故障

  • 如果NameNode或者JobTraker关掉,那么整个集群瘫痪。
  • 对于7×24生产环境,是具有极大的风险。
第二部分:常见的HA方案
  • 第一种是可以设置一个NFS的目录,存储fsimage和editlog,存储的是实时数据,这样当namenode挂掉后能够通过fsimage和editlog进行完全恢复。
  • fsimage:它是NameNode启动时对整个文件系统的快照。
  • edits:它是在NameNode启动后,对文件系统的改动序列。

只有在NameNode重启时,edits才会合并到fsimage文件中,从而得到一个文件系统的最新快照。但是在生产环境集群中的NameNode是很少重启的,这意味者当NameNode运行来很长时间后,edits文件会变的很大。在这种情况下就会出现下面这些问题:

  1. edits文件会变的很大,如何去管理这个文件?
  2. NameNode的重启会花费很长的时间,因为有很多改动要合并到fsimage文件上。
  3. 如果NameNode宕掉了,那我们就丢失了很多改动,因为此时的fsimage文件时间戳比较旧。
  • 第二种是设置Secondary Namenode。

因此为了克服这个问题,我们需要一个易于管理的机制来帮助我们减小edits文件的大小和得到一个最新的fsimage文件,这样也会减小在NameNode上的压力。而Secondary NameNode就是为了帮助解决上述问题提出的,它的职责是合并NameNode的edits到fsimage文件中。如图所示:

img

上图的工作原理,我这里也赘述下:

  1. 首先,它定时到NameNode去获取edits,并更新到fsimage上。
  2. 一旦它有新的fsimage文件,它将其拷贝回NameNode上。
  3. NameNode在下次重启时回使用这个新的fsimage文件,从而减少重启的时间。

Secondary NameNode的整个目的在HDFS中提供一个Checkpoint Node,通过阅读官方文档可以清晰的知道,它只是NameNode的一个助手节点,这也是它在社区内被认为是Checkpoint Node的原因。

现在,我们明白Secondary NameNode所做的是在文件系统这设置一个Checkpoint来帮助NameNode更好的工作;它不是取代NameNode,也不是NameNode的备份。

来源:解读Secondary NameNode的功能

  • 问题:不能迅速的切换,需要花费一定时间恢复。

FaceBook的方案

  • 不改变namenode和datanode整体逻辑的基础上,在其上层开发出AvaterNode,AvatarNode的意思就是支持互相切换。
  • 提供一个Primary Avatar和一个Standby Avatar,通过virual IP来设置IP地址。
  • Primary Avatar对外提供服务,设置了NFS目录,将FSImage和EditLog远程存储。Standby Avatar将NFS目录中的FSImage和EditLog读取过来进行同步,并且设置Standby Avatar一直处于safemode状态,不影响正常操作。这样Standby Avatar相当于一个热拷贝,获得了所有的实时数据。
第三部分:Hadoop0.23 是如何解决的HA

img

  • 提供2台 机器做双机热备
  • 一台为Active 节点,一台为StandBy节点
  • 同时只有Active节点对外提供服务
  • 源数据存储在共享存储
  • StandBy会时刻到共享存储拿Meta信息,以保证切换时不会丢掉数据
  • DataNode会向2台机器汇报自己的信息
  • 仍需要配置Sencondary NameNode接解决Edits log变大问题

来源:Hadoop常见 HA方案 及如何解决HA

others

实际上,尽管很多说法说Hadoop v1中没有namenode的容错处理,但是仍有一些文章:

说明v1中存在较为完善的容错处理方案即NFS和SNN(应用在hadoop-0.20.2上的补丁程序即为facebook的Avatar机制)且v2中有更强大的HA方案解决此单点故障问题。

具体如何,待看官方文档。

操作

版本一

ss

有一个文件FileA,100M大小。Client将FileA写入到HDFS上。HDFS按默认配置。

HDFS分布在三个机架上Rack1,Rack2,Rack3。

a. Client将FileA按64M分块。分成两块,block1和Block2;

b. Client向nameNode发送写数据请求,如图蓝色虚线①------>。

c. NameNode节点,记录block信息。并返回可用的DataNode,如粉色虚线②--------->。

​ Block1: host2,host1,host3

​ Block2: host7,host8,host4

​ 原理:

​ NameNode具有RackAware机架感知功能,这个可以配置。

  • 若client为DataNode节点,那存储block时,规则为:副本1,同client的节点上;副本2,不同机架节点上;副本3,同第二个副本机架的另一个节点上;其他副本随机挑选。
  • 若client不为DataNode节点,那存储block时,规则为:副本1,随机选择一个节点上;副本2,不同副本1,机架上;副本3,同副本2相同的另一个节点上;其他副本随机挑选。

d. client向DataNode发送block1;发送过程是以流式写入。

版本二

  1. 使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
  2. Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文件创建一个记录,否则会让客户端抛出异常;
  3. 当客户端开始写入文件的时候,开发库会将文件切分成多个packets,并在内部以数据队列"data queue"的形式管理这些packets,并向Namenode申请新的blocks,获取用来存储replicas的合适的datanodes列表,列表的大小根据在Namenode中对replication的设置而定。
  4. 开始以pipeline(管道)的形式将packet写入所有的replicas中。开发库把packet以流的方式写入第一个datanode,该datanode把该packet存储之后,再将其传递给在此pipeline中的下一个datanode,直到最后一个datanode,这种写数据的方式呈流水线的形式。
  5. 最后一个datanode成功存储之后会返回一个ack packet,在pipeline里传递至客户端,在客户端的开发库内部维护着"ack queue",成功收到datanode返回的ack packet后会从"ack queue"移除相应的packet。
  6. 如果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关闭,出现故障的datanode会从当前的pipeline中移除,剩余的block会继续剩下的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的datanode,保持replicas设定的数量。

版本一

s

那么,读操作流程为:

a. client向namenode发送读请求。

b. namenode查看Metadata信息,返回fileA的block的位置。

​ block1:host2,host1,host3

​ block2:host7,host8,host4

c. block的位置是有先后顺序的,先读block1,再读block2。而且block1去host2上读取;然后block2,去host7上读取;

上面例子中,client位于机架外,那么如果client位于机架内某个DataNode上,例如,client是host6。那么读取的时候,遵循的规律是:优选读取本机架上的数据

版本二

  1. 使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
  2. Namenode会视情况返回文件的部分或者全部block列表,对于每个block,Namenode都会返回有该block拷贝的DataNode地址;
  3. 客户端开发库Client会选取离客户端最接近的DataNode来读取block;如果客户端本身就是DataNode,那么将从本地直接获取数据.
  4. 读取完当前block的数据后,关闭与当前的DataNode连接,并为读取下一个block寻找最佳的DataNode;
  5. 当读完列表的block后,且文件读取还没有结束,客户端开发库会继续向Namenode获取下一批的block列表。
  6. 读取完一个block都会进行checksum验证,如果读取datanode时出现错误,客户端会通知Namenode,然后再从下一个拥有该block拷贝的datanode继续读。